package com.inyourcode.core.threads.iml;


import com.inyourcode.core.threads.ConsumerTask;
import com.inyourcode.core.threads.api.HashExecutor;
import com.inyourcode.core.util.NamedThreadFactory;
import com.inyourcode.core.util.StackTraceUtil;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executor;

/**
 * <u>固定的线程池
 * <li>支持根据hash值，将任务投递到指定的线程处理</li>
 *
 * </u>
 *
 * @author JackLei
 **/
public class HashExecutorFactory extends AbstractExecutorFactory<HashExecutor> {
    private static final Logger LOGGER = LoggerFactory.getLogger(HashExecutorFactory.class);

    @Override
    public HashExecutor newExecutor(Target target) {
        HashThreadPool hashThreadPool = new HashThreadPool(coreWorks(target), target.name());
        return new HashExecutor() {
            @Override
            public void execute(int hash, Runnable task) {
                hashThreadPool.getExecutor(hash).execute(task);
            }

            @Override
            public void crossExecute(int hash, Executor callBackExecutor, Runnable asyncTask, ConsumerTask callBackTask) {
                execute(hash, () -> {
                    try {
                        asyncTask.run();
                        callBackTask.asyncAfterHandle();
                        callBackExecutor.execute(() -> {
                            try {
                                callBackTask.accept(null);
                            } catch (Exception ex) {
                                LOGGER.error("execute callback task failed, hash:{}, exceptioni:{}", hash, StackTraceUtil.stackTrace(ex));
                            }
                        });
                    } catch (Exception ex) {
                        callBackExecutor.execute(() -> {
                            try {
                                callBackTask.accept(ex);
                            } catch (Exception ex2) {
                                LOGGER.error("execute exception callback task failed, hash:{}, exceptioni:{}", hash, StackTraceUtil.stackTrace(ex2));
                            }
                        });
                    }
                });
            }

            @Override
            public void execute(Runnable task) {
                hashThreadPool.getExecutor(0).execute(task);
            }

            @Override
            public void shutdown() {
                for (DefaultEventExecutor executor : hashThreadPool.executors) {
                    executor.shutdownGracefully();
                }
            }
        };
    }


    static class HashThreadPool {
        private DefaultEventExecutor[] executors;
        private int coreSize;
        private String poolName;

        public HashThreadPool(int coreSize, String poolName) {
            this.coreSize = coreSize;
            this.poolName = poolName;

            executors = new DefaultEventExecutor[coreSize];
            for (int i = 0; i < coreSize; i++) {
                executors[i] = new DefaultEventExecutor(new NamedThreadFactory(this.poolName));
            }
        }

        private EventExecutor getExecutor(int hash) {
            return executors[(hash % executors.length) >>> 1];
        }
    }
}
